Перейти к основному содержимому

7.05. RabbitMQ

Разработчику Архитектору Инженеру

RabbitMQ

RabbitMQ — это популярный брокер сообщений (Message Broker), который реализует протокол AMQP. Он используется для асинхронного обмена данными между приложениями через очереди. RabbitMQ обеспечивает надёжную доставку сообщений, масштабируемость и гибкость.

Официальный сайт - https://www.rabbitmq.com/

Сообщения хранятся в очередях до тех пор, пока не будут обработаны. Производитель отправляет сообщение в очередь, а потребитель забирает его позже.

image-11.png

RabbitMQ использует модель «производитель-потребитель» с промежуточным хранилищем — очередью:

  • Producer (Производитель) отправляет сообщения в RabbitMQ, может быть любым приложением или системой;
  • Exchange (Обменник) принимает сообщения от производителя и определяет, в какую очередь поместить сообщение, исходя из правил маршрутизации;
  • Queue (Очередь) хранит сообщения до тех пор, пока они не будут обработаны потребителем.
  • Consumer (Потребитель) забирает сообщения из очереди и обрабатывает их согласно логике приложения.

RabbitMQ поддерживает несколько типов обменников (exchanges), которые определяют правила маршрутизации сообщений:

  • Direct - сообщение отправляется в очередь, которая соответствует ключу маршрутизации;
  • Fanout - сообщение рассылается во все очереди, связанные с этим обменником;
  • Topic - сообщение отправляется в очереди, соответствующие шаблону ключа маршрутизации;
  • Headers - маршрутизация основана на заголовках сообщения, а не на ключе.

Virtual Host — это изолированное пространство внутри RabbitMQ, которое позволяет разделять ресурсы (очереди, обменники, пользователи) между различными проектами или командами. Каждый Virtual Host имеет свои собственные очереди, обменники и разрешения.

Создать Virtual Host можно через консоль или веб-интерфейс. RabbitMQ предоставляет встроенный веб-интерфейс для мониторинга и управления брокером. Этот интерфейс называется RabbitMQ Management, который представляет собой плагин rabbitmq_management.

RabbitMQ Management по умолчанию доступен по адресу http://localhost:15672 под учётными данными guest/guest (логин/пароль). Интерфейс позволяет просматривать статистику (количество сообщений, скорость обработки, использование памяти), состояние очередей, обменников и соединений, создавать и удалять очереди, обменники.

Как настроить RabbitMQ?

  1. Установка Erlang на сервер — это зависимость для RabbitMQ (он работает поверх Erlang). Пример:
sudo apt update
sudo apt install erlang

  1. Установка RabbitMQ. Сначала нужно добавить официальный репозиторий RabbitMQ:
sudo apt-get install curl gnupg
curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo gpg --dearmor > /usr/share/keyrings/rabbitmq-archive-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/rabbitmq-archive-keyring.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/deb/ubuntu $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/rabbitmq.list
sudo apt update

А затем установить RabbitMQ:

sudo apt install rabbitmq-server
  1. Запуск RabbitMQ. Для этого нужно запустить службу RabbitMQ:
sudo systemctl start rabbitmq-server

Желательно также включить автозапуск RabbitMQ при загрузке системы:

sudo systemctl enable rabbitmq-server

Чтобы убедиться в корректности запуска, можно проверить статус:

sudo systemctl status rabbitmq-server
  1. Создание виртуального хоста для изоляции проектов:
sudo rabbitmqctl add_vhost my_vhost
  1. Настройка прав доступа к виртуальному хосту.

Создание пользователя:

sudo rabbitmqctl add_user my_user my_password

Назначение прав на виртуальный хост:

sudo rabbitmqctl set_permissions -p my_vhost my_user ".*" ".*" ".*"
  1. Включение плагина управления.

Активировать веб-интерфейс:

sudo rabbitmq-plugins enable rabbitmq_management

Веб-интерфейс будет доступен по адресу http://localhost:15672

  1. Подключение к RabbitMQ.

В разных языках программирования используются соответствующие клиентские библиотеки. Нужно добавить к проекту программы библиотеку, а затем:

  • создать соединение с RabbitMQ, указав хост, порт и учётные данные;
  • создать канал (логическое соединение внутри физического соединения - все операции выполняются через канал);
  • создать обменник с указанием типа;
  • создать очередь с уникальным именем;
  • связать обменник с очередью, указав ключ маршрутизации.

7.1. C# - библиотека RabbitMQ.Client.

Пример использования:

using RabbitMQ.Client;
using System.Text;

var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
// Создание очереди
channel.QueueDeclare(queue: "my_queue", durable: false, exclusive: false, autoDelete: false, arguments: null);
// Отправка сообщения
string message = "Hello, RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "my_queue", basicProperties: null, body: body);
Console.WriteLine("Message sent");
}

7.2. Java - библиотека amqp-client.

Пример использования:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class RabbitMQExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// Создание очереди
channel.queueDeclare("my_queue", false, false, false, null);
// Отправка сообщения
String message = "Hello, RabbitMQ!";
channel.basicPublish("", "my_queue", null, message.getBytes());
System.out.println("Message sent");
}
}
}

7.3. Python - библиотека pika.

Пример использования:

import pika

# Подключение к RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='my_queue')
# Отправка сообщения
channel.basic_publish(exchange='', routing_key='my_queue', body='Hello, RabbitMQ!')
print("Message sent")
connection.close()

7.4. JavaScript (Node.js) - библиотека amqplib.

Пример использования:

const amqp = require('amqplib');
async function sendMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
// Создание очереди
const queue = 'my_queue';
await channel.assertQueue(queue, { durable: false });
// Отправка сообщения
const message = 'Hello, RabbitMQ!';
channel.sendToQueue(queue, Buffer.from(message));
console.log("Message sent");
setTimeout(() => {
connection.close();
}, 500);
}
sendMessage();

7.5. PHP - библиотека php-amqplib.

Пример использования:

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// Создание очереди
$channel->queue_declare('my_queue', false, false, false, false);
// Отправка сообщения
$msg = new AMQPMessage('Hello, RabbitMQ!');
$channel->basic_publish($msg, '', 'my_queue');
echo "Message sent\n";
$channel->close();
$connection->close();

  1. Отправка и получение сообщений.

Отправка сообщения (Producer) выполняется через обменник.

Получение сообщения (Consumer) выполняется из очереди. Нужно подписаться на очередь и получить сообщение.

  1. Мониторинг. В веб-интерфейсе можно анализировать список очередей и статистику, а в разделе Exchanges можно увидеть обменники и их привязки.
  2. Масштабирование. Для масштабирования можно настроить кластер RabbitMQ. Но важно убедиться, что все узлы кластера имеют одинаковую версию RabbitMQ и Erlang.